package org.example.model;

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;

import java.io.IOException;
import java.util.Date;

/**
 *  这里填写类yapi分类名称
 */
public class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, String> {
    /**
     * key:RDD的key value：RDD的value name：每个Reducer的编号 return<String>:文件名
     */
    @Override
    protected String generateFileNameForKeyValue(String key, String value,
                                                 String name) {
        /**
         * spark向已经存在的目录下输出文件，如果文件名已经存在则会覆盖
         */
        return  key+"/"+key+"-"+new Date().getTime()+".csv";
    }

    @Override
    public void checkOutputSpecs(FileSystem ignored, JobConf job)
            throws FileAlreadyExistsException, InvalidJobConfException,
            IOException {
        Path outDir = getOutputPath(job);
        if (outDir == null && job.getNumReduceTasks() != 0) {
            throw new InvalidJobConfException(
                    "Output directory not set in JobConf.");
        }
        if (outDir != null) {
            FileSystem fs = outDir.getFileSystem(job);
            // normalize the output directory
            outDir = fs.makeQualified(outDir);
            setOutputPath(job, outDir);
            // get delegation token for the outDir's file system
            TokenCache.obtainTokensForNamenodes(job.getCredentials(),
                    new Path[] { outDir }, job);
            //使spark的输出目录可以存在
            // check its existence
            /*if (fs.exists(outDir)) {
                throw new FileAlreadyExistsException("Output directory "
                        + outDir + " already exists");
            }*/
        }
    }
}
